/*
 *  +----------------------------------------------------------------------
 *  | sophon [ A FAST GAME FRAMEWORK ]
 *  +----------------------------------------------------------------------
 *  | Copyright (c) 2023-2029 All rights reserved.
 *  +----------------------------------------------------------------------
 *  | Licensed ( http:www.apache.org/licenses/LICENSE-2.0 )
 *  +----------------------------------------------------------------------
 *  | Author: jqiris <1920624985@qq.com>
 *  +----------------------------------------------------------------------
 */

use std::collections::HashMap;

use dashmap::DashMap;
use once_cell::sync::Lazy;
#[cfg(not(target_os = "linux"))]
use tokio::signal;
#[cfg(target_os = "linux")]
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::RwLock;

use crate::configs::{get_rpc_conf, get_servers_conf};
use crate::fatal;
#[cfg(target_os = "linux")]
use crate::info;
use crate::utils::get_env_default;
use crate::{discover::*, logger::*, rpcs::*, treaty::*};

static LAUNCHER: Lazy<RwLock<Launcher>> = Lazy::new(|| RwLock::new(Launcher::new()));

pub async fn register_creator(typ: String, creator: ServerCreator) {
    let lr = LAUNCHER.read().await;
    lr.register_creator(typ, creator);
}

pub async fn startup() {
    let mut lr = LAUNCHER.write().await;
    lr.startup().await;
}

pub async fn shutdown() {
    let mut lr = LAUNCHER.write().await;
    lr.shut_down().await;
}

pub async fn supervision() {
    let lr = LAUNCHER.read().await;
    lr.supervision().await;
}

pub struct Launcher {
    creators: DashMap<String, ServerCreator>,
    launched: Vec<ServerEntity>,
}

impl Launcher {
    pub fn new() -> Self {
        Self {
            creators: DashMap::new(),
            launched: Vec::new(),
        }
    }
    pub fn register_creator(&self, typ: String, creator: ServerCreator) {
        match self.creators.contains_key(&typ) {
            true => {
                fatal!("RegisterCreator duplicate, type:{}", typ);
            }
            false => {
                self.creators.insert(typ, creator);
            }
        }
    }

    pub async fn startup(&mut self) {
        //初始化默认rpc
        init_rpc(get_rpc_conf()).await;
        //加载服务器
        let servers = get_servers_conf();
        let (run_mode, run_server) = (
            get_env_default("run_mode", "normal".to_string()),
            get_env_default("run_server", "".to_string()),
        );
        if run_mode == "docker" {
            if run_server.len() < 1 {
                fatal!("请指定运行服务器");
                return;
            }
            match servers.get(&run_server) {
                Some(cfg) => {
                    self.startup_server(cfg).await;
                }
                None => {
                    fatal!("找不到运行服务器");
                    return;
                }
            }
        } else {
            self.startup_all(servers).await;
        }
    }
    async fn startup_server(&mut self, cfg: &Server) {
        match self.creators.get(&cfg.server_type) {
            Some(creator) => match creator(cfg.clone()) {
                Ok(server) => {
                    server.init();
                    self.launched.push(server);
                    self.startup_after_init().await;
                }
                Err(err) => {
                    fatal!("创建服务失败，配置:{:?},err:{}", cfg, err);
                }
            },
            None => {
                fatal!("创建者为空，配置:{:?}", cfg);
            }
        };
    }
    async fn startup_all(&mut self, servers: HashMap<String, Server>) {
        let mut launch_servers = Vec::new();
        for (_, cfg) in servers.iter() {
            let is_launch = cfg.is_launch.unwrap_or(false);
            if is_launch {
                launch_servers.push(cfg);
            }
        }
        launch_servers.sort_by(|a, b| {
            let (mut wa, mut wb) = (0, 0);
            if let Some(v) = a.launch_weight {
                wa = v;
            }
            if let Some(v) = b.launch_weight {
                wb = v;
            }
            wa.cmp(&wb)
        });
        for cfg in launch_servers {
            match self.creators.get(&cfg.server_type) {
                Some(creator) => match creator(cfg.clone()) {
                    Ok(server) => {
                        server.init();
                        self.launched.push(server);
                    }
                    Err(err) => {
                        fatal!("创建服务失败，配置:{:?},err:{}", cfg, err);
                        return;
                    }
                },
                None => {
                    fatal!("创建者为空，配置:{:?}", cfg);
                    return;
                }
            };
        }
        self.startup_after_init().await;
    }
    pub async fn startup_after_init(&self) {
        for server in self.launched.iter() {
            server.after_init().await;
        }
    }
    pub async fn shut_down(&mut self) {
        //关闭服务
        self.launched.sort_by(|a, b| {
            let (mut wa, mut wb) = (0, 0);
            if let Some(v) = a.server_cfg().shut_weight {
                wa = v;
            }
            if let Some(v) = b.server_cfg().shut_weight {
                wb = v;
            }
            wa.cmp(&wb)
        });
        for server in self.launched.iter() {
            server.before_shutdown().await;
        }
        for server in self.launched.iter() {
            server.shutdown().await;
        }
        //关闭默认rpc
        close_rpc();
    }

    //监控
    #[cfg(target_os = "linux")]
    pub async fn supervision(&self) {
        init_server_list().await;
        let mut server_steam = get_server_steam().await.unwrap();
        let mut data_steam = get_data_steam().await.unwrap();
        let mut interrupt = signal(SignalKind::interrupt()).unwrap();
        let mut terminate = signal(SignalKind::terminate()).unwrap();
        loop {
            tokio::select! {
                _sg = interrupt.recv() =>{
                    info!("receive the shutdown interrupt signal");
                    break;
                },
                _sg = terminate.recv()  =>{
                    info!("receive the shutdown terminate signal");
                    break;
                },
                msg = server_steam.message()=>{
                    if let Some(resp) = msg.unwrap() {
                        deal_server_stream(resp).await;
                    }
                },
                msg = data_steam.message()=>{
                    if let Some(resp) = msg.unwrap() {
                        deal_data_stream(resp).await;
                    }
                }
            }
        }
    }
    #[cfg(not(target_os = "linux"))]
    pub async fn supervision(&self) {
        init_server_list().await;
        let mut server_steam = get_server_steam().await.unwrap();
        let mut data_steam = get_data_steam().await.unwrap();
        loop {
            tokio::select! {
                sg=signal::ctrl_c() =>{
                    if let Err(err) = sg{
                        fatal!("Unable to listen for shutdown signal: {}", err);
                    }
                    break;
                },
                msg = server_steam.message()=>{
                    if let Some(resp) = msg.unwrap() {
                        deal_server_stream(resp).await;
                    }
                },
                msg = data_steam.message()=>{
                    if let Some(resp) = msg.unwrap() {
                        deal_data_stream(resp).await;
                    }
                }
            }
        }
    }
}
